嗨嗨!大家好!歡迎來到 Rust 三十天挑戰的第十七天!
昨天我們征服了多執行緒併發程式設計,學會了如何讓多個執行緒協同工作。今天我們要探索另一種併發模式:非同步程式設計!
如果說多執行緒是「雇用更多工人同時工作」,那麼非同步程式設計就像是「讓一個聰明的工人在等待時去做別的事情」。在處理 I/O 密集型任務(如網路請求、檔案讀寫、資料庫查詢)時,非同步程式設計往往比多執行緒更有效率!
在我學習 C# 的時候,async/await
徹底改變了我對併發的理解。在 Rust 中,非同步程式設計同樣重要,特別是對於:
讓我們先看看傳統同步程式碼的問題:
use std::time::Duration;
use std::thread;
// 模擬同步的網路請求
fn sync_fetch_data(url: &str) -> String {
println!("開始請求: {}", url);
thread::sleep(Duration::from_secs(2)); // 模擬網路延遲
println!("完成請求: {}", url);
format!("來自 {} 的資料", url)
}
fn main() {
let start = std::time::Instant::now();
// 同步執行三個請求
let data1 = sync_fetch_data("https://api1.example.com");
let data2 = sync_fetch_data("https://api2.example.com");
let data3 = sync_fetch_data("https://api3.example.com");
println!("所有資料:{}, {}, {}", data1, data2, data3);
println!("總耗時:{:.2}秒", start.elapsed().as_secs_f64());
}
這個程式會依序執行三個請求,總共需要 6 秒!但實際上,我們可以同時發送這些請求。
在 Rust 中,async/await
語法是語言內建的,但我們需要一個執行環境 (runtime) 來實際執行非同步任務。Tokio
是目前最受歡迎的選擇!
首先,讓我們在專案中新增 Tokio:
cargo new async_example
cd async_example
cargo add tokio --features full
或者在 Cargo.toml
中手動新增:
[dependencies]
tokio = { version = "1.0", features = ["full"] }
讓我們重寫剛才的同步範例:
use tokio::time::{sleep, Duration};
// 非同步函式使用 async 關鍵字
async fn async_fetch_data(url: &str) -> String {
println!("開始請求: {}", url);
sleep(Duration::from_secs(2)).await; // 使用 .await 等待
println!("完成請求: {}", url);
format!("來自 {} 的資料", url)
}
#[tokio::main] // Tokio 的 main 函式標記
async fn main() {
let start = std::time::Instant::now();
// 並行執行三個請求
let (data1, data2, data3) = tokio::join!(
async_fetch_data("https://api1.example.com"),
async_fetch_data("https://api2.example.com"),
async_fetch_data("https://api3.example.com")
);
println!("所有資料:{}, {}, {}", data1, data2, data3);
println!("總耗時:{:.2}秒", start.elapsed().as_secs_f64());
}
執行這個程式,你會發現總耗時約為 2 秒,而不是 6 秒!三個請求幾乎同時執行。
// async 函式實際上回傳一個 Future
async fn example() -> i32 {
42
}
// 等價於
fn example_desugared() -> impl std::future::Future<Output = i32> {
async { 42 }
}
use tokio::time::{sleep, Duration};
async fn step1() -> String {
sleep(Duration::from_millis(100)).await;
"步驟一完成".to_string()
}
async fn step2() -> String {
sleep(Duration::from_millis(200)).await;
"步驟二完成".to_string()
}
async fn sequential_example() {
println!("開始依序執行");
let result1 = step1().await; // 等待步驟一
println!("{}", result1);
let result2 = step2().await; // 等待步驟二
println!("{}", result2);
println!("依序執行完成");
}
async fn concurrent_example() {
println!("開始並行執行");
// 同時啟動兩個任務
let task1 = step1();
let task2 = step2();
// 等待兩個任務都完成
let (result1, result2) = tokio::join!(task1, task2);
println!("{}", result1);
println!("{}", result2);
println!("並行執行完成");
}
#[tokio::main]
async fn main() {
sequential_example().await;
println!("---");
concurrent_example().await;
}
use tokio::time::{sleep, Duration};
async fn background_task(id: u32) {
for i in 1..=3 {
println!("背景任務 {} - 步驟 {}", id, i);
sleep(Duration::from_millis(500)).await;
}
println!("背景任務 {} 完成", id);
}
#[tokio::main]
async fn main() {
println!("啟動背景任務");
// 啟動三個獨立的背景任務
let handle1 = tokio::spawn(background_task(1));
let handle2 = tokio::spawn(background_task(2));
let handle3 = tokio::spawn(background_task(3));
// 主任務的工作
for i in 1..=5 {
println!("主任務 - 步驟 {}", i);
sleep(Duration::from_millis(300)).await;
}
// 等待所有背景任務完成
let _ = tokio::try_join!(handle1, handle2, handle3);
println!("所有任務完成");
}
use tokio::time::{sleep, Duration};
async fn fast_task() -> &'static str {
sleep(Duration::from_millis(100)).await;
"快速任務完成"
}
async fn slow_task() -> &'static str {
sleep(Duration::from_millis(500)).await;
"慢速任務完成"
}
#[tokio::main]
async fn main() {
let result = tokio::select! {
res = fast_task() => {
println!("快速任務先完成: {}", res);
res
}
res = slow_task() => {
println!("慢速任務先完成: {}", res);
res
}
};
println!("選擇的結果: {}", result);
}
讓我們建立一個實際的 HTTP 客戶端來抓取網頁內容:
cargo add reqwest --features json
use reqwest;
use tokio::time::{timeout, Duration};
async fn fetch_url(url: &str) -> Result<String, Box<dyn std::error::Error>> {
println!("正在請求: {}", url);
// 設定 5 秒超時
let response = timeout(
Duration::from_secs(5),
reqwest::get(url)
).await??;
let content = response.text().await?;
println!("完成請求: {} (長度: {})", url, content.len());
Ok(content)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let urls = vec![
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/json",
];
let start = std::time::Instant::now();
// 並行發送所有請求
let mut tasks = Vec::new();
for url in urls {
tasks.push(tokio::spawn(fetch_url(url)));
}
// 等待所有請求完成
for task in tasks {
match task.await? {
Ok(content) => println!("成功獲取內容 ({}字元)", content.len()),
Err(e) => println!("請求失敗: {}", e),
}
}
println!("總耗時: {:.2}秒", start.elapsed().as_secs_f64());
Ok(())
}
非同步程式設計中的錯誤處理需要特別注意:
use tokio::time::{sleep, Duration};
#[derive(Debug)]
enum ApiError {
NetworkError(String),
ParseError(String),
}
impl std::fmt::Display for ApiError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
ApiError::NetworkError(msg) => write!(f, "網路錯誤: {}", msg),
ApiError::ParseError(msg) => write!(f, "解析錯誤: {}", msg),
}
}
}
impl std::error::Error for ApiError {}
async fn risky_operation(success: bool) -> Result<String, ApiError> {
sleep(Duration::from_millis(100)).await;
if success {
Ok("操作成功".to_string())
} else {
Err(ApiError::NetworkError("模擬網路失敗".to_string()))
}
}
async fn handle_multiple_operations() {
let operations = vec![
tokio::spawn(risky_operation(true)),
tokio::spawn(risky_operation(false)),
tokio::spawn(risky_operation(true)),
];
for (i, operation) in operations.into_iter().enumerate() {
match operation.await {
Ok(Ok(result)) => println!("操作 {} 成功: {}", i, result),
Ok(Err(e)) => println!("操作 {} 失敗: {}", i, e),
Err(e) => println!("操作 {} 任務錯誤: {}", i, e),
}
}
}
#[tokio::main]
async fn main() {
handle_multiple_operations().await;
}
✅ 適合非同步的情況:
✅ 適合多執行緒的情況:
use tokio::task;
use std::time::Duration;
// CPU 密集型任務
fn cpu_intensive_work(n: u64) -> u64 {
(1..=n).sum()
}
// 在非同步上下文中執行 CPU 密集型任務
async fn hybrid_example() {
// I/O 任務(非同步)
let io_task = tokio::time::sleep(Duration::from_millis(100));
// CPU 任務(在執行緒池中執行)
let cpu_task = task::spawn_blocking(|| {
cpu_intensive_work(1_000_000)
});
// 等待兩個任務都完成
let (_, cpu_result) = tokio::join!(io_task, cpu_task);
match cpu_result {
Ok(sum) => println!("CPU 計算結果: {}", sum),
Err(e) => println!("CPU 計算錯誤: {}", e),
}
}
#[tokio::main]
async fn main() {
hybrid_example().await;
}
讓我們結合今天學到的知識,建立一個簡單的並行網頁爬蟲:
use reqwest;
use tokio::time::{timeout, Duration};
use std::collections::HashSet;
#[derive(Debug)]
struct CrawlResult {
url: String,
title: Option<String>,
status_code: u16,
response_time: Duration,
}
async fn crawl_page(url: String) -> Result<CrawlResult, Box<dyn std::error::Error + Send + Sync>> {
let start = std::time::Instant::now();
let response = timeout(
Duration::from_secs(10),
reqwest::get(&url)
).await??;
let status_code = response.status().as_u16();
let html = response.text().await?;
// 簡單的標題提取(實際專案中應該使用 HTML 解析器)
let title = extract_title(&html);
Ok(CrawlResult {
url,
title,
status_code,
response_time: start.elapsed(),
})
}
fn extract_title(html: &str) -> Option<String> {
// 簡化的標題提取
if let Some(start) = html.find("<title>") {
if let Some(end) = html[start + 7..].find("</title>") {
return Some(html[start + 7..start + 7 + end].trim().to_string());
}
}
None
}
async fn crawl_urls(urls: Vec<String>, max_concurrent: usize) -> Vec<CrawlResult> {
let mut results = Vec::new();
let mut tasks = Vec::new();
println!("開始爬取 {} 個網址,最大並行數: {}", urls.len(), max_concurrent);
for url in urls {
tasks.push(tokio::spawn(crawl_page(url)));
// 控制並行數量
if tasks.len() >= max_concurrent {
let completed = futures::future::join_all(tasks).await;
for result in completed {
match result {
Ok(Ok(crawl_result)) => {
println!("✓ {}: {} ({:.2}秒)",
crawl_result.url,
crawl_result.status_code,
crawl_result.response_time.as_secs_f64());
results.push(crawl_result);
}
Ok(Err(e)) => println!("✗ 爬取錯誤: {}", e),
Err(e) => println!("✗ 任務錯誤: {}", e),
}
}
tasks.clear();
}
}
// 處理剩餘的任務
if !tasks.is_empty() {
let completed = futures::future::join_all(tasks).await;
for result in completed {
match result {
Ok(Ok(crawl_result)) => {
println!("✓ {}: {} ({:.2}秒)",
crawl_result.url,
crawl_result.status_code,
crawl_result.response_time.as_secs_f64());
results.push(crawl_result);
}
Ok(Err(e)) => println!("✗ 爬取錯誤: {}", e),
Err(e) => println!("✗ 任務錯誤: {}", e),
}
}
}
results
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 需要先添加 futures 依賴:cargo add futures
let urls = vec![
"https://httpbin.org/delay/1".to_string(),
"https://httpbin.org/html".to_string(),
"https://httpbin.org/json".to_string(),
"https://example.com".to_string(),
];
let start = std::time::Instant::now();
let results = crawl_urls(urls, 3).await;
println!("\n爬取摘要:");
println!("成功爬取: {} 個網頁", results.len());
println!("總耗時: {:.2}秒", start.elapsed().as_secs_f64());
// 統計結果
let avg_response_time: f64 = results.iter()
.map(|r| r.response_time.as_secs_f64())
.sum::<f64>() / results.len() as f64;
println!("平均響應時間: {:.2}秒", avg_response_time);
Ok(())
}
別忘了新增必要的依賴:
cargo add futures
今天我們深入探索了 Rust 的非同步程式設計:
核心概念:
async/await
語法與 Future 概念實用技巧:
tokio::join!
和 tokio::select!
的並行控制tokio::spawn
創建獨立的背景任務timeout
處理超時情況實戰能力:
效能優勢:
明天我們將學習 模組系統 (Module System),探討如何組織和管理大型 Rust 專案的程式碼結構。這對於我們即將在第四週開始的部落格後端專案非常重要!
為了練習今天學到的非同步程式設計,試著完成以下挑戰:
挑戰目標:建立一個多功能的網路工具
功能需求:
進階挑戰:
這個挑戰將讓你綜合運用今天學到的所有非同步程式設計技巧!
我們明天見!